Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: filtered records holding up pipeline with destination batching #1987

Merged
merged 12 commits into from
Nov 26, 2024

Conversation

hariso
Copy link
Contributor

@hariso hariso commented Nov 22, 2024

Description

In #1986 we discovered an issue where, if there are records to be filtered and batching with a time delay is used, Conduit is waiting for the batch delay as soon as it encounters a filtered record.

The underlying issue is the following: when a processor node in a pipeline identifies a filtered record, then it goes on to acknowledge it. However, the acker node will not acknowledge it until the previous records have been acknowledged.

If batching is enabled, and the batch is not full, then Conduit is waiting for the batch to fill up, to flush it and ack the records. This creates a cyclic dependency: the filtered record is waiting for the previous records to be acked (the batch flushed), and the previous records are waiting for the filtered record to be acked, so that new records can come in and form a full batch.

This issue is solved by not nacking the filtered records in the processor. Instead, we let them pass through the pipeline until they reach the final node, the destination record, which simply acks them.

Fixes #1986.

Quick checks

  • I have followed the Code Guidelines.
  • There is no other pull request for the same update/change.
  • I have written unit tests.
  • I have made sure that the PR is of reasonable size and can be easily reviewed.

Copy link
Member

@raulb raulb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good.

@maha-hajja maha-hajja marked this pull request as ready for review November 25, 2024 19:54
@maha-hajja maha-hajja requested a review from a team as a code owner November 25, 2024 19:54
@hariso hariso requested review from maha-hajja and raulb November 26, 2024 12:20
@hariso
Copy link
Contributor Author

hariso commented Nov 26, 2024

@raulb @maha-hajja Thanks for the review and the suggestions! I've updated the PR with a few more tests.

I've also tested it with a few pipeline:

  • file-to-file, without processor, without filter, with batching
  • file-to-file, with processor, no filter, with batching
  • file-to-file, with processor and filter, with batching
  • file-to-file, with processor and filter, with batching

I've tested all of them with and without batching (batch size + batch delay).

Copy link
Contributor

@maha-hajja maha-hajja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work!

pkg/lifecycle/stream/destination_test.go Outdated Show resolved Hide resolved
pkg/lifecycle/stream/message.go Show resolved Hide resolved
hariso and others added 3 commits November 26, 2024 17:46
…ub.com:ConduitIO/conduit into haris/fix-filtered-records-holding-up-pipeline
@hariso hariso enabled auto-merge (squash) November 26, 2024 16:49
@hariso hariso merged commit 43fe0a1 into main Nov 26, 2024
3 checks passed
@hariso hariso deleted the haris/fix-filtered-records-holding-up-pipeline branch November 26, 2024 17:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Batching with delay and filtering is very slow
3 participants